从项目的一个 panic 说起:Go 中 Sync 包的分析应用
The following article is from 架构技术漫谈 Author 平平
项目开发中遇到一个错误 “fatal error: concurrent map read and map write”。
有过一两年 Golang 开发经验的同学应该都不陌生,这是 Golang 内建的 map 并发非安全 导致的。
解决办法也不复杂 --- 用 sync.map 代替了内建的 map 。
//初始化,多维度数据分开
s.pmap = new(sync.Map)
s.tmap = new(sync.Map)
s.bmap = new(sync.Map)
……
//存储数据,数据类型比较灵活
for _, param := range *** {
s.pmap.Store(param.Name, param)
if param.Type == *** {
s.tmap.Store(param.ID, ***)
} else {
s.bmap.Store(param.ID, ***)
}
}
……
//获取数据
data, ok := s.pmap.Load(name)
sync.map 包是 Golang 1.9 之后官方支持的并发安全的 map 包,之所以放在 sync 包里,是因为 sync 包里包含了 Golang 语言的绝大部分同步原语,它是 Golang 并发编程中占据重要地位的一个包。
其实 Golang1.9 之前这个问题的解决办法是给内建的 map 加锁 sync.RWMutex 等,也在 sync 包里。
Golang 语言作为一个原生支持用户态进程(Goroutine)的语言,当提到并发编程都离不开锁。锁是一种并发编程中的同步原语(Synchronization Primitives),它能保证多个 Goroutine 在访问同一片内存时不会出现竞争或同时写导致的各种不一致混乱及异常等问题。
我们从 sync 包的源码目录结构可以看出,sync 包主要提供了 Mutex、RWmutex、WaitGroup、Map、Once、Cond、Pool、PoolQueue等同步原语。
下面我们会逐个分析其作用及使用注意事项(选用了一些 Golang 经典面试题中常见的一些错误),我们集中看使用场景比较多的几个。sync 包中还有个 atomic 目录,是 Golang 提供一些的底层的原子操作,不是我们今天讨论的内容,感兴趣的同学可以自行查看源码。
一、Sync.Mutex
Sync.Mutex 互斥锁,它用来保证在任一时刻,只能有一个 Goroutine 访问共享资源,即互斥访问。
互斥锁提供两个方法,加锁 Lock()和 解锁 Unlock()。
sync.Mutex 初始值为 Unlock 状态,并且 sync.Mutex 常做为其它结构体的匿名变量使用。
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32
sema uint32
}
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
……
func (m *Mutex) Lock() //锁住 m,如果 m 已经被加锁,Lock 将会引发 panic;
func (m *Mutex) Unlock() //解锁 m,如果 m 未加锁,Unlock 也会引发 panic,unlock of unlocked mutex;
1. 资源有锁时必须先解锁后才能再加锁,资源没锁时必须先加锁才能解锁。
看下面这段代码
var mu sync.Mutex
var chain string
func main() {
chain = "main"
A()
fmt.Println(chain)
}
func A() {
mu.Lock() //mu.Unlock
defer mu.Unlock()
chain = chain + " --> A"
B()
}
func B() {
chain = chain + " --> B"
C()
}
func C() {
mu.Lock()
defer mu.Unlock()
chain = chain + " --> C"
}
……
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_SemacquireMutex(0x514b7c)
/usr/lib/golang/src/runtime/sema.go:62 +0x34
sync.(*Mutex).Lock(0x514b78)
/usr/lib/golang/src/sync/mutex.go:87 +0x9d
main.C()
如果将函数 A 中 mu.Lock 改为 mu.Unlock()
fatal error: sync: unlock of unlocked mutex
goroutine 1 [running]:
runtime.throw(0x4a8a4c, 0x1e)
/usr/lib/golang/src/runtime/panic.go:596 +0x95 fp=0xc42005dea8 sp=0xc42005de88
sync.throw(0x4a8a4c, 0x1e)
/usr/lib/golang/src/runtime/panic.go:585 +0x35 fp=0xc42005dec8 sp=0xc42005dea8
sync.(*Mutex).Unlock(0x514b78)
/usr/lib/golang/src/sync/mutex.go:113 +0xa4 fp=0xc42005def0 sp=0xc42005dec8
main.A()
函数 A 执行时先对 mu 加了锁,而后进入函数 B ,函数 B 又进入函数 C,整个过程中并未对 mu 进行解锁,这时又对 mu 进行加锁是不行的。
使用 Lock() 加锁后,不能再继续对其加锁,直到利用 Unlock() 解锁后才能再加锁。
同理,未加锁时直接在函数 A 中对 mu 进行解锁也是不行的,必须是先加所再解锁成对出现的。
2. sync.Mutex 最好通过指针进行传递。(sync包的所有同步原语都一样)
看下面一段代码
type data struct {
mu sync.Mutex
}
func (d data) test(s string) {
fmt.Printf("mu address %p \n",&(d.mu))
d.mu.Lock()
defer d.mu.Unlock()
for i:=0;i<5 ;i++ {
fmt.Println(s,i)
time.Sleep(time.Second)
}
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
var d data
go func() {
defer wg.Done()
d.test("read")
}()
go func() {
defer wg.Done()
d.test("write")
}()
wg.Wait()
}
……
mu address 0xc42000e1f8 //锁的地址不一样了,所以锁机制失效了。本意是要通过同一把锁来对共享资源进行控制的。
write 0
mu address 0xc42009e000
read 0
read 1
write 1
write 2
read 2
read 3
write 3
write 4
read 4
sync.Mutex 通过函数参数传递或者 struct 结构体中值传递时是值传递,会复制一个副本出来,可以看上面例子中的两个锁的地址是完全不同的。
所以本意通过同一把锁来对共享资源进行控制的时候通过值传递复制一个新的锁出来就达不到想要的效果,锁失效。
所以传递 Mutex 时,必须用指针进行传递,或者相关的方法必须使用指针接收者。
type data struct {
*mu sync.Mutex //使用前记得初始化 data{new(sync.Mutex)}
}
或者
func (d *data) test(s string) { //指针接收者
}
3. 在第一次被使用后,不能再对sync.Mutex进行复制(sync包的所有原语都一样)。
type MyMutex struct {
count int
sync.Mutex
}
func main() {
var mu MyMutex
mu.Lock()
var mu1 = mu //加锁后复制了一个新的Mutex出来,此时 mu1 跟 mu的锁状态一致,都是加锁的状态
mu.count++
mu.Unlock()
mu1.Lock()
mu1.count++
mu1.Unlock()
fmt.Println(mu.count, mu1.count)
}
……
fatal error: all goroutines are asleep - deadlock!
复制会连带 Mutex 的状态一起复制,所以 mu1 其实跟复制时 mu 的状态一样,已经是加锁状态,下面再加锁就会死锁。
二、Sync.RWMutex
sync.RWMutex 是一个读写互斥锁,它是为了保证多个 Goroutine 可以同时读取某一个共享资源,但只有一个 Goroutine 能够更新资源,也就是说可以同时读,不能同时写,也不能同时读写,即读和写是互斥的,写和写也是互斥的,读和读是不互斥的。
它主要用于读多写少的场景,支持 Lock、Unlock、RLock、RUnlock 四个方法,除了“写锁定”和“写解锁”状态外,多了一个“读锁定”和“读解锁“。
func (rw *RWMutex) Lock() //写锁定 rw,禁止其他 Goroutine 读取和写入。如果 rw 已经被写锁定,仍然需要先解锁才可以再次锁定
func (rw *RWMutex) Unlock() //写解锁 rw,如果 rw 未被写锁定,Unlock 也会引发 panic;
func (rw *RWMutex) RLock() //读锁定 rw,如果 rw 已经被读锁定,禁止其他 Goroutine 写入,但可以读取;读锁可以连续锁定,不一定非要先解读锁再锁定,因为读和读是不互斥的。
func (rw *RWMutex) RUnlock() //读解锁 rw,如果 rw 未被读锁定,RUnlock 会引发 panic;
这里体现了不同,sync.RWMutex 允许至少一个读锁或一个写锁存在,即可以有多个读锁。而 sync.Mutex 只允许一个读锁或一个写锁存在。
看下面这段代码
var mu sync.RWMutex
var count int
func main() {
go A()
time.Sleep(2 * time.Second)
mu.Lock()
defer mu.Unlock()
count++
fmt.Println(count)
}
func A() {
fmt.Println("enter A")
mu.RLock()
fmt.Println("A after RLock")
defer mu.RUnlock()
B()
}
func B() {
fmt.Println("enter B")
time.Sleep(5 * time.Second)
C()
}
func C() {
fmt.Println("enter C")
mu.RLock() //main中 mu.Lock 写锁定了,这里就死锁了。
defer mu.RUnlock()
}
……
enter A
A after RLock
enter B
enter C
fatal error: all goroutines are asleep - deadlock!
RWMutex 为了有效防止写锁饥饿,当写锁阻塞时,新的读锁是无法申请的,main 函数中进行了写锁定,且用了 defer 要到 main 函数执行完时才会解锁,此时 A 协程执行到 C 时重新申请读锁定是不允许的,导致整个程序死锁。
注意代码中有两行 time 控制,是为了保证 main 和 子协程执行的时机的,方便控制锁的先后时机。感兴趣的同学可以自行注释掉或者调整 Sleep 的时间试验下。
三、Sync.WaitGroup
sync.WaitGroup 组等待,它的使用场景是等待一组 Goroutine 执行完成。
其内部拥有一个计数器,每个 Goroutine 启动时加 1,运行结束时减 1,主程序通过检测计数器来判断整组 Goroutine 是否执行完毕。
我们使用sync.WaitGroup 时主要用到Add、Done、Wait 三个方法。
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
noCopy noCopy //第一次使用后不可复制
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state, and the other 4 as storage
// for the sema.
state1 [3]uint32
}
func (wg *WaitGroup) Add(delta int) //整组任务开始时按照任务数量设定计数器值,或者在每个 Goroutine 中单独加1也可以;
func (wg *WaitGroup) Done() //Goroutine 执行完成,即完成其中一个任务,计数器减1;
func (wg *WaitGroup) Wait(): //主程序等待,直到计数器归零。如果计数器小于 0,则该操作会引发 panic。
看下面一段代码
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
fmt.Println("1")
wg.Done()
wg.Add(1)
}()
wg.Wait()
}
……
1
panic: sync: WaitGroup is reused before previous Wait has returned
main 中设定了整组任务只有一个,然后在 Goroutine 中进行了 Done 操作,这时主程序中 wait 判断计数器清零了就返回了,这时对已经返回的 WaitGroup 再次设定 Add 导致 panic。
func main() {
var wg sync.WaitGroup
ch := make(chan int, 10)
for i := 0; i < 10; i++ {
wg.Add(1)
go getData(i, wg, ch)
}
wg.Wait()
fmt.Println("all done")
for i := 0; i < 10; i++ {
counter := <-ch
fmt.Println(counter," inform done")
}
}
func getData(index int, wg sync.WaitGroup, ch chan int) {
//fmt.Printf("wg address: %p \n",&wg)
defer wg.Done()
fmt.Println(index," get data done")
ch <- index
}
……
2 get data done
0 get data done
1 get data done
9 get data done
3 get data done
6 get data done
4 get data done
8 get data done
7 get data done
5 get data done
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc42009c04c)
/usr/lib/golang/src/runtime/sema.go:47 +0x34
sync.(*WaitGroup).Wait(0xc42009c040)
/usr/lib/golang/src/sync/waitgroup.go:131 +0x7a
……
why?为啥死锁了呐?程序执行完了,但是 wait 的地方却死锁?计数器哪里出问题了吗?还是说每个 getData 里用的都不是同一个 wg?
接下来我们把 getData 函数中第一行注释掉的打印 wg 地址的内容打开看下
wg address: 0xc42000e230
wg address: 0xc42009e000
9 get data done
wg address: 0xc42000e260
3 get data done
7 get data done
wg address: 0xc42009e040
wg address: 0xc42000e290
0 get data done
8 get data done
wg address: 0xc42009e060
wg address: 0xc42000e2b0
2 get data done
1 get data done
wg address: 0xc42000e2e0
wg address: 0xc4200a4000
5 get data done
6 get data done
wg address: 0xc4200c4000
4 get data done
果然,wg 地址完全不同,每次都复制了一个新的,这时再回头看讲 Mutex 时我们提到过同步原语需要用指针传递。说干就干……
func main() {
var wg sync.WaitGroup
ch := make(chan int, 10)
for i := 0; i < 10; i++ {
wg.Add(1)
go getData(i, &wg, ch)
}
wg.Wait()
fmt.Println("all done")
for i := 0; i < 10; i++ {
counter := <-ch
fmt.Println(counter," inform done")
}
}
func getData(index int, wg *sync.WaitGroup, ch chan int) {
fmt.Printf("wg address: %p \n",wg)
defer wg.Done()
fmt.Println(index," get data done")
ch <- index
}
……
wg address: 0xc42000e220
wg address: 0xc42000e220
9 get data done
3 get data done
wg address: 0xc42000e220
7 get data done
wg address: 0xc42000e220
wg address: 0xc42000e220
0 get data done
8 get data done
wg address: 0xc42000e220
wg address: 0xc42000e220
wg address: 0xc42000e220
1 get data done
2 get data done
wg address: 0xc42000e220
6 get data done
5 get data done
wg address: 0xc42000e220
4 get data done
all done
9 inform done
3 inform done
7 inform done
0 inform done
8 inform done
1 inform done
2 inform done
6 inform done
5 inform done
4 inform done
完美,同一个 wg 地址是相同的,锁生效了。
四、Sync.Once
sync.Once 的作用是多次调用但只执行一次。Once 只有一个方法 Do(),向 Do 传入一个函数,这个函数在第一次执行 Once.Do() 的时候会被调用,以后再执行 Once.Do() 将没有任何动作,即使传入了其它的函数,也不会被执行,如果要执行其它函数,需要重新创建一个 Once 对象。
// Once is an object that will perform exactly one action.
type Once struct {
// done indicates whether the action has been performed.
// It is first in the struct because it is used in the hot path.
// The hot path is inlined at every call site.
// Placing done first allows more compact instructions on some architectures (amd64/x86),
// and fewer instructions (to calculate offset) on other architectures.
done uint32
m Mutex
}
func (o *Once) Do(f func()) //多次调用但只执行一次
看下面一段代码,4次调用中只执行了一次,只打印了first 0
func main(){
once := &sync.Once{}
for i := 0; i < 4; i++ {
i := i
go func() {
once.Do(func() {
fmt.Printf("first %d\n", i)
})
}()
}
time.Sleep(3*time.Second)
}
……
first 0
结合到实际项目应用中,我取了个巧,利用它来做单例模式了,用起来还挺方便的。
var (
once sync.Once
s *dataService
)
func GetInstanceDataService() *dataService {
once.Do(func() {
s = &dataService{}
})
return s
}
五、Sync.Map
接下来我们说回开篇的并发安全的 Map。sync.Map先上一段源码
type Map struct {
mu Mutex //互斥锁,用于锁定dirty map
read atomic.Value //优先读read map,支持原子操作
dirty map[interface{}]*entry // 空间换时间,多了一个dirty map,是一个当前最新的map,允许读写
misses int // 主要记录read读取不到数据加锁读取read map以及dirty map的次数,当misses等于dirty的长度时,会将dirty复制到read
}
// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
m map[interface{}]*entry
amended bool // true if the dirty map contains some key not in m.
}
func (m *Map) Load(key interface{}) (value interface{}, ok bool) //查询一个key,可以通过 ok 值来判断是否存在
func (m *Map) Store(key, value interface{}) //存储 keyvalue 键值对
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) //检索或添加之前不存在的元素。如果键之前在 map 中存在就返回key的实际值和true,不需要添加了。
func (m *Map) Delete(key interface{}) //删除元素,如果元素不存在也不会引发异常。
func (m *Map) Range(f func(key, value interface{}) bool) //按特定函数规则或条件遍历 map,仍然是无序的
可以看到 sync.Map 支持 Load、Store、LoadOrStore、Delete、Range 五个方法,开篇我们也结合项目看了用法了。
进一步仔细阅读源码发现 sync.Map 与内建的 Map 相比 之所以并发安全效率又高,主要是以下几个特点:
以空间换效率,通过read和dirty两个map来提高读取效率 优先从read map中读取(无锁),否则再从dirty map中读取(加锁) 动态调整,当misses次数过多时,将dirty map提升为read map 延迟删除,删除只是为value打一个标记,在dirty map提升时才执行真正的删除
这块儿具体的原理参见参考文献,这里不多说了。
需要注意 sync.Map 没有 len 方法。
func main() {
var m sync.Map
m.LoadOrStore("a", 1)
m.Delete("a")
fmt.Println(m.Len())
}
……
./testsyncmap.go:13:18: m.Len undefined (type sync.Map has no field or method Len)
今天我们结合项目中编码示例和一些面试题的坑点一起分析了 Golang 中 sync 包的一些常用同步原语的使用及注意事项:
sync 包中的所有同步原语,尽量用指针 所有同步原语首次使用后不能复制 注意相关锁的状态及互斥性
以上表述不当的地方还望大家见谅并及时批评指正~~
【参考文献】
https://colobu.com/2017/07/11/dive-into-sync-Map/
推荐阅读